MongoDB学习(四)一一MongoDB 聚合

MongoDB 聚合

MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果。有点类似sql语句中的 count(*)。

aggregate() 方法

MongoDB中聚合的方法使用aggregate()
语法如下:

1
>db.COLLECTION_NAME.aggregate(AGGREGATE_OPERATION)

举个栗子:

首先我们插入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
> db.collection.insert(
... {
... title: "shuai",
... description: "67 is shuaibi",
... by_user: "kelele67",
... url: "http://kelele67.github.io",
... tags: ["mongodb", "database", "NoSQL"],
... likes: 1000
... })
WriteResult({ "nInserted" : 1 })
> db.collection.insert(
... {
... title: "chou",
... description: "liu is choubi",
... by_user: "liu",
... url: "http://liu.github.io",
... tags: ["mongodb", "database", "NoSQL"],
... likes: 10
... })
WriteResult({ "nInserted" : 1 })
> db.collection.insert(
... {
... title: "mei",
... description: "qi is meibi",
... by_user: "qi",
... url: "http://qi.github.io",
... tags: ["mysql", "database", "SQL"],
... likes: 500
... })
WriteResult({ "nInserted" : 1 })

现在我们通过以上集合计算每个作者所写的文章数,使用aggregate()计算结果如下:

1
2
3
4
> db.collection.aggregate([{$group:{_id : "$by_user", num_tutorial : {$sum : 1}}}])
{ "_id" : "qi", "num_tutorial" : 1 }
{ "_id" : "liu", "num_tutorial" : 1 }
{ "_id" : "kelele67", "num_tutorial" : 1 }

以上实例类似sql语句: select by_user, count(*) from mycol group by by_user

在上面的例子中,我们通过字段by_user字段对数据进行分组,并计算by_user字段相同值的总和。

表达式

一些聚合的表达式:

表达式 描述 实例
$sum 计算总和 db.mycol.aggregate([{$group : {_id : “$by_user”, num_tutorial : {$sum : “$likes”}}}])
$avg 计算平均值 db.mycol.aggregate([{$group : {_id : “$by_user”, num_tutorial : {$avg : “$likes”}}}])
$min 获取集合中所有文档对应值得最小值 db.mycol.aggregate([{$group : {_id : “$by_user”, num_tutorial : {$min : “$likes”}}}])
$max 获取集合中所有文档对应值得最大值 db.mycol.aggregate([{$group : {_id : “$by_user”, num_tutorial : {$max : “$likes”}}}])
$push 在结果文档中插入值到一个数组中 db.mycol.aggregate([{$group : {_id : “$by_user”, url : {$push: “$url”}}}])
$addToSet 在结果文档中插入值到一个数组中,但不创建副本 db.mycol.aggregate([{$group : {_id : “$by_user”, url : {$addToSet : “$url”}}}])
$first 根据资源文档的排序获取第一个文档数据 db.mycol.aggregate([{$group : {_id : “$by_user”, first_url : {$first : “$url”}}}])
$last 根据资源文档的排序获取最后一个文档数据 db.mycol.aggregate([{$group : {_id : “$by_user”, last_url : {$last : “$url”}}}])

管道

管道在Unix和Linux中一般用于将当前命令的输出结果作为下一个命令的参数。
MongoDB的聚合管道将MongoDB文档在一个管道处理完毕后将结果传递给下一个管道处理。管道操作是可以重复的。
表达式:处理输入文档并输出。表达式是无状态的,只能用于计算当前聚合管道的文档,不能处理其它的文档。
这里我们介绍一下聚合框架中常用的几个操作:

  • $project:修改输入文档的结构。可以用来重命名、增加或删除域,也可以用于创建计算结果以及嵌套文档

  • $match:用于过滤数据,只输出符合条件的文档。$match使用MongoDB的标准查询操作

  • $limit:用来限制MongoDB聚合管道返回的文档数 (limit)

  • $skip:在聚合管道中跳过指定数量的文档,并返回余下的文档

  • $unwind:将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值

  • $group:将集合中的文档分组,可用于统计结果 (group by)

  • $sort:将输入文档排序后输出 (order by)

  • $geoNear:输出接近某一地理位置的有序文档

管道例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
db.candy.insert( [
{
"sweetness" : 2,
"price" : 5.0,
"pnumber" : "p003",
},{
"sweetness" : 2,
"price" : 8.0,
"pnumber" : "p002"
},{
"sweetness" : 1,
"price" : 4.0,
"pnumber" : "p002"
},{
"sweetness" : 2,
"price" : 4.0,
"pnumber" : "p001"
},{
"sweetness" : 4,
"price" : 10.0,
"pnumber" : "p003"
},{
"sweetness" : 10,
"price" : 20.0,
"pnumber" : "p001"
},{
"sweetness" : 10,
"price" : 20.0,
"pnumber" : "p003"
},{
"sweetness" : 5,
"price" : 10.0,
"pnumber" : "p002"
}
])
BulkWriteResult({
"writeErrors" : [ ],
"writeConcernErrors" : [ ],
"nInserted" : 8,
"nUpserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"upserted" : [ ]
})

aggregate pipeline的$group

①. $group

将集合中的文档分组,可用于统计结果,$group首先将数据根据key进行分组。
_id 是要进行分组的key,如果_id为null 相当于select count(*) from table

②. $sum

我们统计candy有几条
相当于SQL:select count(1) as count from candy

1
2
3
4
> db.candy.count()
8
> db.candy.aggregate([{$group:{_id:null, count:{$sum:1}}}])
{ "_id" : null, "count" : 8 }

我们统计一下甜度
相当于SQL:select sum(sweetness) as total from candy

1
2
> db.candy.aggregate([{$group:{_id:null, total:{$sum:"$sweetness"}}}])
{ "_id" : null, "total" : 36 }

我们通过产品类型来进行分组,然后在统计甜度和是多少
相当于SQL:select sum(sweetness) as total from candy group by pnumber

1
2
3
4
> db.candy.aggregate([{$group:{_id:"$pnumber", total:{$sum:"$sweetness"}}}])
{ "_id" : "p001", "total" : 12 }
{ "_id" : "p002", "total" : 8 }
{ "_id" : "p003", "total" : 16 }

③. $min 、 $max

我们通过相同的产品类型来进行分组,然后查询相同产品类型最甜的糖果的订单详情
相当于SQL:select max(sweetness) as sweetness from candy group by pnumber

1
2
3
4
> db.candy.aggregate([{$group:{_id:"$pnumber",max:{$max:"$sweetness"}}}])
{ "_id" : "p001", "max" : 10 }
{ "_id" : "p002", "max" : 5 }
{ "_id" : "p003", "max" : 10 }

我们通过相同的产品类型来进行分组,统计各个产品数量,然后获取最大的数量,相当于SQL: select max(t.total) from (select sum(sweetness) as total from candy group by pnumber) t

1
2
3
4
5
6
> db.candy.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$sweetness"}}}])
{ "_id" : "p001", "total" : 12 }
{ "_id" : "p002", "total" : 8 }
{ "_id" : "p003", "total" : 16 }
> db.candy.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$sweetness"}}}, {$group:{_id:null, max:{$max:"$total"}}}])
{ "_id" : null, "max" : 16 }

④. $avg

先根据$group,再计算平均值,只会针对数字进行计算,会对字符串忽略
我们通过相同的产品类型来进行分组,然后查询每个订单详情相同产品类型卖出的平均价格
相当于SQL:select avg(price) as price from items group by pnumber

1
2
3
4
> db.candy.aggregate([{$group:{_id:"$pnumber",price:{$avg:"$price"}}}])
{ "_id" : "p001", "price" : 12 }
{ "_id" : "p002", "price" : 7.333333333333333 }
{ "_id" : "p003", "price" : 11.666666666666666 }

⑤. $push

将指定的表达式的值添加到一个数组中,这个值不要超过16M,不然会出现错误
我们通过相同的产品类型来进行分组,然后查询每个相同产品的甜度放在数组里面

1
2
3
4
> db.candy.aggregate([{$group:{_id:"$pnumber",sweetnesses:{$push:"$sweetness"}}}])
{ "_id" : "p001", "sweetnesses" : [ 2, 10 ] }
{ "_id" : "p002", "sweetnesses" : [ 2, 1, 5 ] }
{ "_id" : "p003", "sweetnesses" : [ 2, 4, 10 ] }

1
2
3
4
> db.candy.aggregate([{$group:{_id:"$pnumber",sweetnesses:{$push:{sweetness:"$sweetness",price:"$price"}}}}])
{ "_id" : "p001", "sweetnesses" : [ { "sweetness" : 2, "price" : 4 }, { "sweetness" : 10, "price" : 20 } ] }
{ "_id" : "p002", "sweetnesses" : [ { "sweetness" : 2, "price" : 8 }, { "sweetness" : 1, "price" : 4 }, { "sweetness" : 5, "price" : 10 } ] }
{ "_id" : "p003", "sweetnesses" : [ { "sweetness" : 2, "price" : 5 }, { "sweetness" : 4, "price" : 10 }, { "sweetness" : 10, "price" : 20 } ] }

⑥. $addToSet

将表达式的值添加到一个数组中(无重复值),这个值不要超过16M,不然会出现错误

1
2
3
4
> db.candy.aggregate([{$group:{_id:"$pnumber",sweetnesses:{$addToSet:"$sweetness"}}}])
{ "_id" : "p001", "sweetnesses" : [ 10, 2 ] }
{ "_id" : "p002", "sweetnesses" : [ 1, 5, 2 ] }
{ "_id" : "p003", "sweetnesses" : [ 10, 4, 2 ] }

⑦. $first、 $last

$first:返回每组第一个文档,如果有排序,按照排序,如果没有按照默认的存储的顺序的第一个文档
$last:返回每组最后一个文档,如果有排序,按照排序,如果没有按照默认的存储的顺序的最后个文档

1
2
3
4
> db.candy.aggregate([{$group:{_id:"$pnumber",sweetnessFrist:{$first:"$sweetness"}}}])
{ "_id" : "p001", "sweetnessFrist" : 2 }
{ "_id" : "p002", "sweetnessFrist" : 2 }
{ "_id" : "p003", "sweetnessFrist" : 2 }

aggregate pipeline其他参数

上面我们介绍了db.candy.aggregate(pipeline, options)和它的$group基础操作,我们接下来介绍pipeline 参数和options参数的基础认识

①. $project

我们对上面的统计结果,只显示count,不想显示_id ,可以通过$project来操作,相当SQL的select 显示我们想要的字段

1
2
> db.candy.aggregate([{$group:{_id:null,count:{$sum:1}}},{$project:{"_id":0,"count":1}}])
{ "count" : 8 }

②. $match

我们想通过滤订单中,想知道甜度大于8的产品有哪些产品
相当于SQL:select sum(sweetness) as total from candy group by pnumber having total>8

1
2
3
> db.candy.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$sweetness"}}},{$match:{total:{$gt:8}}}])
{ "_id" : "p001", "total" : 12 }
{ "_id" : "p003", "total" : 16 }

如果是放在$group之前就是当做where来使用,我们只统计pnumber =p001 产品甜度之和
相当于SQL:select sum(sweetness) as total from items where pnumber=”p001”

1
2
> db.candy.aggregate([{$match:{"pnumber":"p001"}},{$group:{_id:null,total:{$sum:"$sweetness"}}}])
{ "_id" : null, "total" : 12 }

③. $skip、 $limit、$sort

db.candy.aggregate([{ $skip: 2 },{ $limit: 4 }])
是先跳过2条,再限制4条

1
2
3
4
5
> db.candy.aggregate([{ $skip: 2 },{ $limit: 4 }])
{ "_id" : ObjectId("5909d53864d6ed405ce2ad97"), "sweetness" : 1, "price" : 4, "pnumber" : "p002" }
{ "_id" : ObjectId("5909d53864d6ed405ce2ad98"), "sweetness" : 2, "price" : 4, "pnumber" : "p001" }
{ "_id" : ObjectId("5909d53864d6ed405ce2ad99"), "sweetness" : 4, "price" : 10, "pnumber" : "p003" }
{ "_id" : ObjectId("5909d53864d6ed405ce2ad9a"), "sweetness" : 10, "price" : 20, "pnumber" : "p001" }

db.candy.aggregate([{ $limit: 4 },{ $skip: 2 }])
是先限制4条,再跳过2条

1
2
3
> db.candy.aggregate([{ $limit: 4 },{ $skip: 2 }])
{ "_id" : ObjectId("5909d53864d6ed405ce2ad97"), "sweetness" : 1, "price" : 4, "pnumber" : "p002" }
{ "_id" : ObjectId("5909d53864d6ed405ce2ad98"), "sweetness" : 2, "price" : 4, "pnumber" : "p001" }

$limit、$skip、$sort、$match可以使用在阶段管道,如果使用在$group之前可以过滤掉一些数据,提高性能

④. $unwind

用于分解$push,将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值

1
2
3
4
5
6
7
8
9
10
11
12
13
> db.candy.aggregate([{$group:{_id:"$pnumber",sweetnesses:{$push:"$sweetness"}}}])
{ "_id" : "p001", "sweetness" : [ 2, 10 ] }
{ "_id" : "p002", "sweetness" : [ 2, 1, 5 ] }
{ "_id" : "p003", "sweetness" : [ 2, 4, 10 ] }
> db.candy.aggregate([{$group:{_id:"$pnumber",sweetnesses:{$push:"$sweetness"}}},{$unwind:"$sweetnesses"}])
{ "_id" : "p001", "sweetnesses" : 2 }
{ "_id" : "p001", "sweetnesses" : 10 }
{ "_id" : "p002", "sweetnesses" : 2 }
{ "_id" : "p002", "sweetnesses" : 1 }
{ "_id" : "p002", "sweetnesses" : 5 }
{ "_id" : "p003", "sweetnesses" : 2 }
{ "_id" : "p003", "sweetnesses" : 4 }
{ "_id" : "p003", "sweetnesses" : 10 }

⑤. $out

$out是pipeline最后一个阶段管道,将最后计算结果写入到指定的collection中

1
2
3
4
5
6
7
8
9
10
> db.candy.aggregate([{$group:{_id:"$pnumber",sweetnesses:{$push:"$sweetness"}}},{$unwind:"$sweetnesses"},{$project:{"_id":0,"sweetnesses":1}},{$out:"result"}])
> db.result.find()
{ "_id" : ObjectId("590af6c9cd17e3052b82395a"), "sweetnesses" : 2 }
{ "_id" : ObjectId("590af6c9cd17e3052b82395b"), "sweetnesses" : 10 }
{ "_id" : ObjectId("590af6c9cd17e3052b82395c"), "sweetnesses" : 2 }
{ "_id" : ObjectId("590af6c9cd17e3052b82395d"), "sweetnesses" : 1 }
{ "_id" : ObjectId("590af6c9cd17e3052b82395e"), "sweetnesses" : 5 }
{ "_id" : ObjectId("590af6c9cd17e3052b82395f"), "sweetnesses" : 2 }
{ "_id" : ObjectId("590af6c9cd17e3052b823960"), "sweetnesses" : 4 }
{ "_id" : ObjectId("590af6c9cd17e3052b823961"), "sweetnesses" : 10 }

⑥. $redact

插入我们的栗子文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
> db.redact.insert(
... {
... _id: 1,
... level: 1,
... status: "A",
... acct_id: "xyz123",
... cc: [{
... level: 1,
... type: "yy",
... num: 000000000000,
... exp_date: ISODate("2015-11-01T00:00:00.000Z"),
... billing_addr: {
... level: 5,
... addr1: "123 ABC Street",
... city: "Some City"
... }
... },{
... level: 3,
... type: "yy",
... num: 000000000000,
... exp_date: ISODate("2015-11-01T00:00:00.000Z"),
... billing_addr: {
... level: 1,
... addr1: "123 ABC Street",
... city: "Some City"
... }
... }]
... })

语法:{ $redact: }
$redact 跟$cond结合使用,并在$cond里面使用了if 、then、else表达式,if-else缺一不可,$redact还有三个重要的参数:

* $$DESCEND:返回包含当前document级别的所有字段,并且会继续判字段包含内嵌文档,内嵌文档的字段也会去判断是否符合条件

>level=1则值为为$$DESCEND,否则为$$PRUNE,从顶部开始扫描下去,执行$$DESCEND包含当前document级别的所有fields。当前级别字段的内嵌文档将会被继续检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
> db.redact.aggregate(
... [
... { $match: { status: "A" } },
... {
... $redact: {
... $cond: {
... if: { $eq: [ "$level", 1] },
... then: "$$DESCEND",
... else: "$$PRUNE"
... }
... }
... }
... ]
... ).pretty()
{
"_id" : 1,
"level" : 1,
"status" : "A",
"acct_id" : "xyz123",
"cc" : [
{
"level" : 1,
"type" : "yy",
"num" : 0,
"exp_date" : ISODate("2015-11-01T00:00:00Z")
}
]
}
--- * $$PRUNE:返回不包含当前文档或者内嵌文档级别的所有字段,不会继续检测此级别的其他字段,即使这些字段的内嵌文档持有相同的访问级别 >不包含当前文档或者内嵌文档级别的所有字段,不会继续检测此级别的其他字段,即使这些字段的内嵌文档持有相同的访问级别。连等级的字段都不显示,也不会去扫描等级字段包含下级
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
> db.redact.aggregate(
... [
... { $match: { status: "A" } },
... {
... $redact: {
... $cond: {
... if: { $eq: [ "$level", 3] },
... then: "$$PRUNE",
... else: "$$DESCEND"
... }
... }
... }
... ]
... ).pretty()
{
"_id" : 1,
"level" : 1,
"status" : "A",
"acct_id" : "xyz123",
"cc" : [
{
"level" : 1,
"type" : "yy",
"num" : 0,
"exp_date" : ISODate("2015-11-01T00:00:00Z"),
"billing_addr" : {
"level" : 5,
"addr1" : "123 ABC Street",
"city" : "Some City"
}
}
]
}
--- * $$KEEP:返回包含当前文档或内嵌文档级别的所有字段,不再继续检测此级别的其他字段,即使这些字段的内嵌文档中持有不同的访问级别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
> db.redact.aggregate(
... [
... { $match: { status: "A" } },
... {
... $redact: {
... $cond: {
... if: { $eq: [ "$level", 1] },
... then: "$$KEEP",
... else: "$$PRUNE"
... }
... }
... }
... ]
... ).pretty()
{
"_id" : 1,
"level" : 1,
"status" : "A",
"acct_id" : "xyz123",
"cc" : [
{
"level" : 1,
"type" : "yy",
"num" : 0,
"exp_date" : ISODate("2015-11-01T00:00:00Z"),
"billing_addr" : {
"level" : 5,
"addr1" : "123 ABC Street",
"city" : "Some City"
}
},
{
"level" : 3,
"type" : "yy",
"num" : 0,
"exp_date" : ISODate("2015-11-01T00:00:00Z"),
"billing_addr" : {
"level" : 1,
"addr1" : "123 ABC Street",
"city" : "Some City"
}
}
]
}
---

aggregate pipeline options参数

explain参数

参考

Aggregation Pipeline基础篇上

Aggregation Pipeline基础篇下